#!/usr/bin/r

gpsf<-list.files("/tmp","gps*")
gpsf<-gpsf[(as.numeric(Sys.time())-unlist(lapply(paste("/tmp/",gpsf,sep=""),file.mtime)))>100]

txt2bl<-function(indata){
	instr<-strsplit(indata,"\\.")
	ins<-data.frame(a=unlist(lapply(1:length(instr),function(i){return(unlist(instr[[i]][1]))})),b=unlist(lapply(1:length(instr),function(i){return(unlist(instr[[i]][2]))})),stringsAsFactors=FALSE)
#	ins<-matrix(unlist(strsplit(indata,"\\.")),ncol=2,byrow=TRUE)
	if(sum(is.na(ins$a),na.rm=TRUE)>0)ins[is.na(ins$a),"a"]<-"000"
	if(sum(is.na(ins$b),na.rm=TRUE)>0)ins[is.na(ins$b),"b"]<-"000"
	if(sum(nchar(ins$a)<2,na.rm=TRUE)>0)ins[nchar(ins$a)<2,"a"]<-"000"
	if(sum(nchar(ins$b)<2,na.rm=TRUE)>0)ins[nchar(ins$b)<2,"b"]<-"000"
	if(sum(nchar(ins$b)>5,na.rm=TRUE)>0)ins[nchar(ins$b)>5,"b"]<-unlist(lapply(ins[nchar(ins$b)>5,"b"],function(i){return(substr(i,1,5))}))
	insec<-as.integer(paste(ins$b,sapply(6-nchar(ins$b),function(i){return(paste0(rep("0",i),collapse=""))}),sep=""))
	inmin<-as.integer(sapply(ins$a,function(i){return(substr(i,nchar(i)-1,nchar(i)))}))+
			60*as.integer(sapply(ins$a,function(i){return(substr(i,1,nchar(i)-2))}))
	out<-inmin*1e6+insec
	return(out)
}
dbCatchWriteTable<-function(db,table,indata,nfr=500){
	ncalls<-floor(nrow(indata)/nfr)
	rowi<-1:nrow(indata)
	for(i in 0:ncalls){
		win<-i*nfr+1:nfr
		win<-win[win%in%rowi]
		dbWriteTable(db,table,indata[win,],append=TRUE)
	}
}

if(length(gpsf)>0){
	col_tel<-4
	rmcdf<-NULL
	ggsdf<-NULL
	gsvdf<-NULL
	gsadf<-NULL
	gstdf<-NULL
	grsdf<-NULL
	gbsdf<-NULL
	vtgdf<-NULL
	library(foreach)
#	library(RSQLite)
	library(doParallel)
	library(parallel)
	library(RMySQL)
	library(RSQLite)
	library(int64)
	cores<-max(c(1,detectCores()-1),na.rm=TRUE)
	if(is.infinite(cores))cores<-1
	registerDoParallel(cores)
	cl<-makeCluster(cores)
	
	indata<-unlist(parLapply(cl,paste("/tmp/",gpsf,sep=""),readLines))

	sid<-strsplit(indata,",")
	maxcol<-max(sapply(sid,length))
	clusterExport(cl,c("maxcol"))
	inmatrix<-as.data.frame(matrix(unlist(lapply(sid,function(i){out<-i;if(length(i)<maxcol){out<-c(i,rep(NA,maxcol-length(i)));};return(out)})),ncol=maxcol,nrow=length(sid),byrow=TRUE),stringsAsFactors=FALSE)
	inmatrix<-inmatrix[!is.na(inmatrix[,col_tel])&!is.na(inmatrix[,3]),]
	inmatrix<-inmatrix[rowSums(is.na(inmatrix))<(ncol(inmatrix)-5),]
	inmatrix<-inmatrix[(nchar(inmatrix[,col_tel])==5)&(nchar(inmatrix[,3])>1),]
	inmatrix<-inmatrix[grep("^G",inmatrix[,col_tel]),]
	inmatrix[,1]<-as.integer(inmatrix[,1])
	inmatrix[,2]<-as.int64(inmatrix[,2])
	inmatrix[,3]<-as.int64(inmatrix[,3])
	for(i in 2:3)inmatrix[,i]<-as.int64(inmatrix[,i])
	gc()
	

	inrmc<-inmatrix[grep("RMC",inmatrix[,col_tel]),]
	inrmc<-inrmc[(inrmc[,col_tel+2]=="A")&(inrmc[,col_tel+4]%in%c("N","S"))&(inrmc[,col_tel+6]%in%c("E","W")),]
	inrmc<-inrmc[!is.na(inrmc[,col_tel+1])&!is.na(inrmc[,col_tel+9]),]
	inrmc<-inrmc[(unlist(lapply(inrmc[,col_tel+3],function(i){return(length(unlist(strsplit(i,"\\."))))}))==2)&
				(unlist(lapply(inrmc[,col_tel+5],function(i){return(length(unlist(strsplit(i,"\\."))))}))==2),]
	if(nrow(inrmc)>0){
		clusterExport(cl,c("inrmc","col_tel"))
		rmcdf<-data.frame(uid=as.integer(inrmc[,1]),epoch=as.int64(inrmc[,3]),id=as.int64(inrmc[,2]),
			breite=txt2bl(inrmc[,col_tel+3]),laenge=txt2bl(inrmc[,col_tel+5]),velocity=round(as.numeric(inrmc[,col_tel+7])*1000),
			fix=as.numeric(inrmc[,col_tel+1]),system=substr(inrmc[,col_tel],2,2),
			fix_epoch=parSapply(cl,1:nrow(inrmc),function(i){
				rmct<-inrmc[i,col_tel+1]
				if(nchar(rmct)<6)rmct<-paste0(paste0(rep(0,6-nchar(rmct)),collapse=""),rmct)
				rmcd<-inrmc[i,col_tel+9]
				if(nchar(rmcd)<6)rmcd<-paste0(paste0(rep(0,6-nchar(rmcd)),collapse=""),rmcd)
				return(as.numeric(strptime(paste(rmcd,rmct,sep=""),format="%d%m%y%H%M%S",tz="GMT")))
			})
		,stringsAsFactors=FALSE)
		rmcdf<-rmcdf[(rmcdf$breite<=5400000000)&(rmcdf$laenge<=10800000000),]
		rm(inrmc)
		gc()
	}

	ingga<-inmatrix[grep("GGA",inmatrix[,col_tel]),]
	ingga<-ingga[(ingga[,col_tel+10]=="M")&(ingga[,col_tel+3]%in%c("N","S"))&(ingga[,col_tel+5]%in%c("E","W"))&!is.na(ingga[,col_tel+1]),]
	ingga<-ingga[!is.na(ingga[,col_tel+1])&(nchar(ingga[,col_tel+1])%in%c(5,6)),]
	if(nrow(ingga)>0){
		clusterExport(cl,c("ingga","col_tel"))
		ggadf<-data.frame(uid=as.integer(ingga[,1]),epoch=as.int64(ingga[,3]),id=as.int64(ingga[,2]),
			fix=as.numeric(ingga[,col_tel+1]),breite=txt2bl(ingga[,col_tel+2]),laenge=txt2bl(ingga[,col_tel+4]),
			sats=as.numeric(ingga[,col_tel+7]),hdop=as.numeric(ingga[,col_tel+8]),hoehe=as.numeric(ingga[,col_tel+9])*1000,
			hoehe_datum=as.numeric(ingga[,col_tel+11])*1000,
			fix_epoch=parSapply(cl,1:nrow(ingga),function(i){
			#	for(i in 1:nrow(ingga)){
				rmct<-ingga[i,col_tel+1]
				if(nchar(rmct)<6)rmct<-paste0(paste0(rep(0,6-nchar(rmct)),collapse=""),rmct)
				rmcd<-format(strptime(ingga[i,3],format="%s"),"%d%m%y")
				if(nchar(rmcd)<6)rmcd<-c(rep(0,6-nchar(rmcd)),rmcd)
				fixdat<-NA
				try(fixdat<-strptime(paste(rmcd,rmct,sep=""),format="%d%m%y%H%M%S",tz="GMT"))
				return(as.numeric(fixdat))
			})
		,stringsAsFactors=FALSE)
		ggadf<-ggadf[(ggadf$breite<=5400000000)&(ggadf$laenge<=10800000000),]
		rm(ingga)
		gc()
	}

	ingsv<-inmatrix[grep("GSV",inmatrix[,col_tel]),]
	for(i in col_tel+1:3)ingsv[,i]<-as.numeric(ingsv[,i])
	ingsv<-ingsv[(nchar(ingsv[,col_tel])==5)&!is.na(ingsv[,col_tel+1])&!is.na(ingsv[,col_tel+2])&!is.na(ingsv[,col_tel+3]),]
	ingsv<-ingsv[(ingsv[,col_tel+1]<=6)&(ingsv[,col_tel+2]<=6)&(ingsv[,col_tel+3]>0),]
	ingsv<-ingsv[rowSums(!is.na(ingsv[,col_tel+4:16]))>0,]
	if(nrow(ingsv)>0){
		ingsv$system<-substr(ingsv[,col_tel],2,2)

		gsvdf<-foreach(i=1:nrow(ingsv),.combine=rbind,.inorder=FALSE,.multicombine=TRUE,.packages=c("int64"))%dopar%{
			library(int64)
			tinv<-ingsv[i,9:ncol(ingsv)]
			tinv<-tinv[!names(tinv)%in%"system"]
			tinv<-tinv[!is.na(tinv)]
			tinv[nchar(tinv)==0]<-0
			tinv<-as.numeric(tinv)
			out<-data.frame(matrix(tinv,ncol=4,byrow=TRUE))
			names(out)<-c("PRN","elevation","azimuth","SNR")
			out$uid<-as.integer(ingsv[i,1])
			out$epoch<-as.int64(ingsv[i,2])
			out$id<-as.int64(ingsv[i,3])
			out$system<-ingsv[i,"system"]
			out$folge<-1:nrow(out)+(ingsv[i,col_tel+2]-1)*4
			return(out)}
		rm(ingsv)
		gc()
	}

	ingsa<-inmatrix[grep("GSA",inmatrix[,col_tel]),]
	ingsa<-ingsa[!is.na(ingsa[,col_tel+13]),]
	ingsa<-ingsa[!is.na(ingsa[,col_tel]),]
	ingsa<-ingsa[!is.na(as.numeric(ingsa[,col_tel+4])),]
	if(nrow(ingsa)>0){
		gsadf<-data.frame(uid=as.numeric(ingsa[,1]),epoch=as.numeric(ingsa[,2]),id=as.numeric(ingsa[,3]),
		pdop=as.numeric(ingsa[,col_tel+15]),hdop=as.numeric(ingsa[,col_tel+16]),vdop=as.numeric(ingsa[,col_tel+17]),
		system=substr(ingsa[,5],3,3),
			sats=sapply(1:nrow(ingsa),function(i){
				tg<-ingsa[i,col_tel+3:14]
				tg<-as.numeric(tg[(nchar(tg)>0)&!is.na(tg)])
				tg<-tg[!is.na(tg)]
				tg<-sort(tg)
				return(paste(tg,collapse="_"))
			})
			,stringsAsFactors=FALSE)
		gsadf<-gsadf[nchar(gsadf$sats)>0,]
		rm(ingsa)
		gc()
	}

	ingbs<-inmatrix[grep("GBS",inmatrix[,col_tel]),]
	if(nrow(ingbs)>0){
		gbsdf<-data.frame(uid=as.numeric(ingbs[,1]),epoch=as.numeric(ingbs[,2]),id=as.numeric(ingbs[,3]),
		system=substr(ingbs[,col_tel],3,3),lat_err=as.numeric(ingbs[,col_tel+2]),lon_err=as.numeric(ingbs[,col_tel+3]),
		alt_err=as.numeric(ingbs[,col_tel+4]),failing_prn=as.numeric(ingbs[,col_tel+5]),prob_fail=as.numeric(ingbs[,col_tel+6]),
		bias_fail=as.numeric(ingbs[,col_tel+7]),stringsAsFactors=FALSE)
		gbsdf<-gbsdf[rowSums(is.na(gbsdf),na.rm=TRUE)<4,]
		rm(ingbs)
		gc()
	}
	
	ingrs<-inmatrix[grep("GRS",inmatrix[,col_tel]),]
	if(nrow(ingrs)>0){
		grsdf<-data.frame(receiver=ingrs[,1],uid=as.numeric(ingrs[,2]),epoch=as.numeric(ingrs[,3]),id=as.numeric(ingrs[,4]),
		system=substr(ingrs[,col_tel],2,2),fix=as.numeric(ingrs[,col_tel+2]),fgga=as.numeric(ingrs[,col_tel+1]),stringsAsFactors=FALSE)
		for(ig in 1:12)grsdf[,paste("S",ig,sep="")]<-as.numeric(ingrs[,7+ig])
		grsdf<-grsdf[rowSums(is.na(grsdf),na.rm=TRUE)<10,]
		rm(ingrs)
		gc()
	}
	
	ingst<-inmatrix[grep("GST",inmatrix[,col_tel]),]
	if(nrow(ingst)>0){
		gstdf<-data.frame(receiver=ingst[,1],uid=as.numeric(ingst[,2]),epoch=as.numeric(ingst[,3]),id=as.numeric(ingst[,4]),
		system=substr(ingst[,col_tel],3,3),
			RMS=as.numeric(ingst[,col_tel+2]),lat_sigma=as.numeric(ingst[,col_tel+6]),lon_sigma=as.numeric(ingst[,col_tel+7]),
			height_sigma=as.numeric(ingst[,col_tel+8])
			,stringsAsFactors=FALSE)
		gstdf<-gstdf[!is.na(gstdf$RMS),]
		rm(ingst)
		gc()
	}
	invtg<-inmatrix[grep("VTG",inmatrix[,col_tel]),]
	if(nrow(invtg)>0){
		vtgdf<-data.frame(receiver=invtg[,1],uid=as.numeric(invtg[,2]),epoch=as.numeric(invtg[,3]),id=as.numeric(invtg[,4]),
		system=substr(invtg[,col_tel],2,2),
			course_true=as.numeric(invtg[,col_tel+1]),course_mag=as.numeric(invtg[,col_tel+3]),speed_kn=as.numeric(invtg[,col_tel+5]),
			speed_kmh=as.numeric(invtg[,col_tel+7])
			,stringsAsFactors=FALSE)
		rm(invtg)
		gc()
	}

#	con = dbConnect(MySQL(), user='gps', password='gps', dbname='gps', host='localhost')
	con <- dbConnect(RSQLite::SQLite(), ":memory:")
	if(!is.null(rmcdf)){
		names(rmcdf)<-toupper(names(rmcdf))
		dbCatchWriteTable(con,"gps_rmc",rmcdf)
	}
	if(!is.null(ggadf)){
		names(ggadf)<-toupper(names(ggadf))
		dbCatchWriteTable(con,"gps_gga",ggadf)
	}
	if(!is.null(gbsdf)){
		names(gbsdf)<-toupper(names(gbsdf))
		dbCatchWriteTable(con,"gps_gbs",gbsdf)
	}
	if(!is.null(gsvdf)){
		names(gsvdf)<-toupper(names(gsvdf))
		dbCatchWriteTable(con,"gps_gsv",gsvdf)
	}
	if(!is.null(gsadf)){
		names(gsadf)<-toupper(names(gsadf))
		dbCatchWriteTable(con,"gps_gsa",gsadf)
	}
	if(!is.null(gstdf)){
		names(gstdf)<-toupper(names(gstdf))
		dbCatchWriteTable(con,"gps_gst",gstdf)
	}
	if(!is.null(grsdf)){
		names(grsdf)<-toupper(names(grsdf))
		dbCatchWriteTable(con,"gps_grs",grsdf)
	}
	if(!is.null(vtgdf)){
		names(vtgdf)<-toupper(names(vtgdf))
		dbCatchWriteTable(con,"gps_vtg",vtgdf)
	}
	dbDisconnect(con)

	for(j in 0:floor(length(gpsf)/1000)){
		rgpsf<-gpsf[1:1000+j*1000]
		rgpsf<-rgpsf[!is.na(rgpsf)]
		system2("sudo",paste("rm ",paste("/tmp/",rgpsf,sep="",collapse=" "),sep=" ",collapse=" "))
	}
#	for(i in gpsf)system2("sudo",paste("rm /tmp/",i,sep=""))
}
